import 'dart:async';

import 'package:async/async.dart';
import 'package:async_test/src/utils.dart';

import '../common/test_page.dart';

class StreamCloserTestPage extends TestPage {
  StreamCloserTestPage(super.title) {
    late StreamCloser<int> closer;
    setUp() {
      closer = StreamCloser();
    }

    group('StreamCloser()当关闭器从未关闭时', () {
      test('转发数据和已完成的事件', () {
        setUp();
        expect(createStream().transform(closer).toList());
      });

      test('转发错误事件', () {
        setUp();
        expect(Stream<int>.error('oh no').transform(closer).toList());
      });

      test('将广播流转换为广播流', () {
        setUp();
        expect(Stream<int>.empty().transform(closer).isBroadcast);
      });

      test("没有急切地倾听", () {
        setUp();
        var controller = StreamController<int>();
        var transformed = controller.stream.transform(closer);
        expect(controller.hasListener);

        transformed.listen(null);
        expect(controller.hasListener);
      });

      test('转发暂停和恢复', () {
        setUp();
        var controller = StreamController<int>();
        var transformed = controller.stream.transform(closer);

        var subscription = transformed.listen(null);
        expect(controller.isPaused);
        subscription.pause();
        expect(controller.isPaused);
        subscription.resume();
        expect(controller.isPaused);
      });

      test('远期取消', () {
        setUp();
        var isCancelled = false;
        var controller =
            StreamController<int>(onCancel: () => isCancelled = true);
        var transformed = controller.stream.transform(closer);

        expect(isCancelled);
        var subscription = transformed.listen(null);
        expect(isCancelled);
        subscription.cancel();
        expect(isCancelled);
      });

      test('转发来自取消的错误', () {
        setUp();
        var controller = StreamController<int>(onCancel: () => throw 'oh no');

        expect(
            controller.stream.transform(closer).listen(null).cancel());
      });
    });

    group('StreamCloser()当在关闭关闭器StreamCloser().close()之前添加流时', () {
      test('StreamQueue() 关闭关闭器后，流会发出关闭事件', () async {
        setUp();
        var queue = StreamQueue(createStream().transform(closer));
        expect(queue);
        expect(queue);
        expect(closer.close());
        expect(queue);
      });

      test('关闭关闭后，内部订阅将被取消', () {
        setUp();
        var isCancelled = false;
        var controller =
            StreamController<int>(onCancel: () => isCancelled = true);

        expect(controller.stream.transform(closer));
        expect(closer.close());
        expect(isCancelled);
      });

      test('StreamCloser().close()转发StreamSubscription.cancel()中的错误', () {
        setUp();
        var controller = StreamController<int>(onCancel: () => throw 'oh no');

        expect(controller.stream.transform(closer));
        expect(closer.close());
      });

      test('StreamCloser().closer()即使流已经完成也能工作', () async {
        setUp();
        expect(await createStream().transform(closer).toList());
        expect(closer.close());
      });

      test('StreamCloser().closer()即使流已经被取消也能工作', () async {
        setUp();
        createStream().transform(closer).listen(null).cancel();
        expect(closer.close());
      });

      test('输出流立即发出done', () {
        setUp();
        var stream = createStream().transform(closer);
        expect(closer.close());
        expect(stream);
      });

      test('如果从不侦听流，则从不侦听基础订阅', () async {
        setUp();
        var controller = StreamController<int>(onListen: () {});
        controller.stream.transform(closer);

        expect(closer.close());

        await pumpEventQueue();
      });

      test('基础订阅被侦听，然后在流被侦听后被取消', () {
        setUp();
        var controller =
            StreamController<int>(onListen: () {}, onCancel: () {});
        var stream = controller.stream.transform(closer);

        expect(closer.close());

        stream.listen(null);
      });

      test('Subscription.cancel()错误被静默忽略', () async {
        setUp();
        var controller = StreamController<int>(onCancel: () => throw 'oh no');
        var stream = controller.stream.transform(closer);

        expect(closer.close());

        stream.listen(null);
        await pumpEventQueue();
      });
    });

    group('StreamCloser()关闭关闭后添加流时', () {
      test('.transform() 输出流立即发出done', () {
        setUp();
        expect(closer.close());
        expect(createStream().transform(closer));
      });

      test('如果从不侦听流，则从不侦听基础订阅', () async {
        setUp();
        expect(closer.close());

        var controller = StreamController<int>(onListen: () {});
        controller.stream.transform(closer);

        await pumpEventQueue();
      });

      test('基础订阅被侦听，然后在流被侦听后被取消', () {
        setUp();
        expect(closer.close());

        var controller = StreamController<int>(
            onListen: () {}, onCancel: () {});

        controller.stream.transform(closer).listen(null);
      });

      test('Subscription.cancel()错误被静默忽略', () async {
        setUp();
        expect(closer.close());

        var controller = StreamController<int>(onCancel: () => throw 'oh no');

        controller.stream.transform(closer).listen(null);

        await pumpEventQueue();
      });
    });
  }
}

Stream<int> createStream() async* {
  yield 1;
  await flushMicrotasks();
  yield 2;
  await flushMicrotasks();
  yield 3;
  await flushMicrotasks();
  yield 4;
}
